feat: Sessions primitive — durable run-aware streams + dashboard (1/5)#3542
feat: Sessions primitive — durable run-aware streams + dashboard (1/5)#3542ericallam wants to merge 1 commit into
Conversation
🦋 Changeset detectedLatest commit: ed7bf97 The changes in this PR will be included in the next version bump. This PR includes changesets to release 29 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
WalkthroughAdds sessions list/detail routes, presenters, SSE endpoints, and a close-session action. Introduces sessionStreams API (standard/noop/test managers), SessionStreamInstance, and core realtime exports. Reworks SSE client retry/backoff, fetch/stall timeouts, and auth handling. Adds S2 StreamsWriter size checks and StreamWriteResult. Introduces AGENT trigger source, propagates taskKind through annotations, repositories, ClickHouse, and run presenters. Adds DB and ClickHouse migrations, path builders, mintRunToken helper, rate-limit whitelist, and multiple test suites. Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
🔴 getTaskQueueInfo early-return path omits taskKind, misclassifying AGENT/SCHEDULED runs as STANDARD
When getTaskQueueInfo is called for a non-locked-worker trigger that provides both a queue override and a per-trigger TTL, the early return at line 219 skips the DB query entirely and returns { queueName, taskTtl: undefined } without taskKind. The caller in triggerTask.server.ts:316 then falls through to taskKind ?? "STANDARD", permanently stamping AGENT or SCHEDULED runs as STANDARD in the ClickHouse task_kind column.
This directly contradicts the PR's own intent — the comment block at queues.server.ts:110-116 on the locked-worker path explicitly says "Always fetch the task so we can resolve triggerSource" to avoid exactly this bug. But the non-locked-worker path has the same early-return optimization from before the PR that was never updated to also fetch triggerSource.
Affected scenario: any AGENT task triggered via the API/SDK with queue: { name: "custom" } and ttl: "5m" (both set). The run will appear as "Standard" in the dashboard's Source filter and won't match a sources=["AGENT"] filter.
(Refers to lines 218-219)
Prompt for agents
In `getTaskQueueInfo` at apps/webapp/app/runEngine/concerns/queues.server.ts, the early-return at line 218-219 fires when both `overriddenQueueName` and `body.options?.ttl` are set and skips the DB query entirely — returning no `taskKind`. This means AGENT and SCHEDULED runs hitting this path get misclassified as STANDARD in ClickHouse.
The fix should either:
1. Remove this early-return optimization entirely (simplest — the subsequent DB queries are cheap and already cached by the replica). The locked-worker path at line 110-129 already does this: it always fetches the task row.
2. Or keep the optimization but still query `backgroundWorkerTask.findFirst` for just `triggerSource` so `taskKind` is populated. This is what the locked-worker path does at lines 117-129.
The same issue affects the `no worker found` fallback at line 231 (`return { queueName: overriddenQueueName ?? defaultQueueName, taskTtl: undefined }`) and the `no task found` fallback at line 265 — though those are edge cases where there's no task row to look up. Still, the line-218 path is the common case and must be fixed.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Actionable comments posted: 12
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
🟡 Minor comments (11)
packages/core/src/v3/test/test-realtime-streams-manager.ts-158-161 (1)
158-161:⚠️ Potential issue | 🟡 Minor | ⚡ Quick win
reset()leakswriteListenersacross tests.
reset()clearsbuffersandpipeWaitsbut leaveswriteListenersintact. For a manager whose primary purpose is per-test isolation, any listener registered in one test (and not explicitly unsubscribed) will continue to fire for writes in subsequent tests that share this instance, leading to flaky/cross-talking tests.🧹 Proposed fix
reset(): void { this.buffers.clear(); this.pipeWaits.clear(); + this.writeListeners.clear(); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/test/test-realtime-streams-manager.ts` around lines 158 - 161, The reset() method currently clears buffers and pipeWaits but leaves writeListeners populated, causing listeners to leak between tests; update the reset() implementation (the reset() method in the test realtime streams manager) to also remove all registered write listeners (e.g., clear or reinitialize the writeListeners collection) so any callbacks registered via writeListeners are unsubscribed/cleared between tests to ensure per-test isolation.packages/core/src/v3/apiClient/runStream.ts-480-491 (1)
480-491:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAbort during backoff is not wakeable
The retry delay promise only listens to
retryNowController. If callersignalaborts during backoff, shutdown can be delayed until the timer expires.Suggested fix
this.retryNowController = new AbortController(); await new Promise<void>((resolve) => { + if (this.options.signal?.aborted) { + resolve(); + return; + } const timer = setTimeout(() => { this.retryNowController?.signal.removeEventListener("abort", onAbort); + this.options.signal?.removeEventListener("abort", onUserAbort); resolve(); }, delay); const onAbort = () => { clearTimeout(timer); + this.options.signal?.removeEventListener("abort", onUserAbort); + resolve(); + }; + const onUserAbort = () => { + clearTimeout(timer); + this.retryNowController?.signal.removeEventListener("abort", onAbort); resolve(); }; this.retryNowController!.signal.addEventListener("abort", onAbort, { once: true }); + this.options.signal?.addEventListener("abort", onUserAbort, { once: true }); });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/apiClient/runStream.ts` around lines 480 - 491, The backoff promise only listens to this.retryNowController and ignores the caller's abort signal, causing shutdown to wait for the timer; update the wait logic in runStream.ts to also listen to the external/caller signal (e.g., this.signal or the passed-in signal) alongside this.retryNowController by adding an event listener on that signal which clears the timeout, resolves the promise, and removes both listeners (mirror the existing onAbort cleanup for retryNowController.signal and the timer) to ensure immediate wake-up on caller abort while preventing listener leaks.packages/core/src/v3/inputStreams/manager.ts-181-188 (1)
181-188:⚠️ Potential issue | 🟡 Minor | ⚡ Quick win
disconnectStreammay be silently undone by auto-reconnect when handlers or waiters still exist.The tail's
.finallyblock auto-reconnects wheneverthis.handlersorthis.onceWaitersstill have entries forstreamId. If any.on()handler or pending.once()waiter is active whendisconnectStreamruns, the abort will complete but the tail immediately reconnects—negating the contract: "disconnect before.wait()suspends so the tail doesn't buffer duplicates delivered through the waitpoint path."Add an
explicitlyDisconnectedflag (as already implemented insessionStreams/manager.ts) to prevent auto-reconnect after intentional disconnect. Mark the stream before abort, check the flag in the.finallyreconnect branch, and clear it on the nexton()/once()call.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/inputStreams/manager.ts` around lines 181 - 188, The disconnectStream implementation currently aborts and removes the tail but can be immediately undone by the tail's .finally auto-reconnect if handlers or onceWaiters still reference streamId; add an explicitlyDisconnected flag (same pattern used in sessionStreams/manager.ts) to the manager: set explicitlyDisconnected[streamId] = true before calling tail.abortController.abort() in disconnectStream, check explicitlyDisconnected inside the tail's .finally reconnect branch to skip reconnect when true, and ensure explicitlyDisconnected[streamId] is cleared when a new .on() or .once() call for that streamId occurs so reconnects resume normally.packages/core/src/v3/test/test-session-stream-manager.ts-215-247 (1)
215-247:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winHandler invocation in
__sendFromTestdoesn't use the safeinvokehelper — and the helper itself is dead.
__sendFromTestcalls handlers directly viaArray.from(handlers).map((h) => Promise.resolve().then(() => h(data)))(Line 225). A thrown/rejected handler will reject thePromise.alland propagate out of__sendFromTest, which is the opposite of the comment on Line 278 ("Never let a handler error break test state"). Meanwhile, theinvokeprivate method defined on Lines 270-279 (which does exactly that catch) is never referenced anywhere in the file — dead code.Either drop
invoke, or route handler dispatch through it so a misbehaving handler doesn't blow up the test harness.🛡️ Suggested fix
const handlers = this.handlers.get(key); if (handlers && handlers.size > 0) { - await Promise.all( - Array.from(handlers).map((h) => Promise.resolve().then(() => h(data))) - ); + await Promise.all( + Array.from(handlers).map(async (h) => { + try { + await h(data); + } catch { + // Never let a handler error break test state + } + }) + ); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/test/test-session-stream-manager.ts` around lines 215 - 247, The handler loop in __sendFromTest currently calls handlers directly with Promise.all(Array.from(handlers).map((h) => Promise.resolve().then(() => h(data)))) so a thrown/rejected handler will reject the whole send; instead reuse the existing private invoke helper (or restore its implementation) to swallow or handle handler errors as intended. Replace the direct handler invocation in __sendFromTest to call this.invoke(h, data) for each handler (i.e., Promise.resolve().then(() => this.invoke(h, data)) or map to this.invoke) so individual handler failures are caught and do not break test state; if invoke is currently dead/incorrect, fix its implementation to catch errors (and optionally log) and return a resolved promise. Ensure the change references the handlers map and the invoke method so behavior is consistent with the "Never let a handler error break test state" comment.packages/core/src/v3/sessionStreams/manager.ts-121-145 (1)
121-145:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAbort listener and timeout handle leak on normal
once()resolution.When a
once()waiter resolves via#dispatch(Line 359), neither thesignal'sabortlistener nor the unused timeout handle are removed/cleared. Theabortlistener is registered with{ once: true }, so it self-clears if the signal eventually fires, but in the common case where the signal outlives thisonce()(e.g., a long-lived run-levelAbortSignalreused across manyonce()calls), the listener — which captures thewaiterandthis— accumulates on the signal until it finally aborts.#dispatchshould also clear the timeout handle (it currently only clears it on Line 358 for the leading waiter viaclearTimeout) — wait, it does — but it never invokessignal.removeEventListener.Compare with
TestSessionStreamManagerwhich keepsabortHandleron the waiter and removes it on dispatch (packages/core/src/v3/test/test-session-stream-manager.ts, Lines 234-236).🛡️ Suggested fix
type OnceWaiter = { resolve: (result: InputStreamOnceResult<unknown>) => void; reject: (error: Error) => void; timeoutHandle?: ReturnType<typeof setTimeout>; + signal?: AbortSignal; + abortHandler?: () => void; };return new InputStreamOncePromise<unknown>((resolve, reject) => { const waiter: OnceWaiter = { resolve, reject }; if (options?.signal) { if (options.signal.aborted) { reject(new Error("Aborted")); return; } - options.signal.addEventListener( - "abort", - () => { - if (waiter.timeoutHandle) clearTimeout(waiter.timeoutHandle); - this.#removeOnceWaiter(key, waiter); - reject(new Error("Aborted")); - }, - { once: true } - ); + const abortHandler = () => { + if (waiter.timeoutHandle) clearTimeout(waiter.timeoutHandle); + this.#removeOnceWaiter(key, waiter); + reject(new Error("Aborted")); + }; + waiter.signal = options.signal; + waiter.abortHandler = abortHandler; + options.signal.addEventListener("abort", abortHandler, { once: true }); }And in
#dispatch(and the buffered-shift path) clear the listener when the waiter is resolved:const waiter = waiters.shift()!; if (waiters.length === 0) this.onceWaiters.delete(key); if (waiter.timeoutHandle) clearTimeout(waiter.timeoutHandle); + if (waiter.signal && waiter.abortHandler) { + waiter.signal.removeEventListener("abort", waiter.abortHandler); + } waiter.resolve({ ok: true, output: data });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/sessionStreams/manager.ts` around lines 121 - 145, The once() waiter installs an abort listener and timeout but does not remove the abort listener when the waiter is resolved via `#dispatch`, causing listener/closure leaks; update once() to store the abort handler on the waiter (e.g., waiter.abortHandler) when adding options.signal.addEventListener, and in `#dispatch` (and any buffered-shift resolution path) call options.signal.removeEventListener("abort", waiter.abortHandler) and clearTimeout(waiter.timeoutHandle) when resolving the waiter so both the listener and timeout are cleaned up; reference the waiter object used in once() and the `#dispatch` method to apply these removals.apps/webapp/app/services/runsRepository/runsRepository.server.ts-45-46 (1)
45-46:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winConstrain
taskKindsto known values.Line 45 currently accepts any string, so invalid values can silently flow into filtering and produce confusing empty results. Tightening this to known task kinds improves input correctness.
Suggested diff
const RunStatus = z.enum(Object.values(TaskRunStatus) as [TaskRunStatus, ...TaskRunStatus[]]); +const TaskKind = z.enum(["TASK", "AGENT", "SCHEDULED"]); const RunListInputOptionsSchema = z.object({ @@ - taskKinds: z.array(z.string()).optional(), + taskKinds: z.array(TaskKind).optional(), });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/webapp/app/services/runsRepository/runsRepository.server.ts` around lines 45 - 46, The taskKinds schema currently allows any string (taskKinds: z.array(z.string()).optional()), which lets invalid values pass; change it to constrain to the known task kinds (e.g., replace z.string() with z.enum([...]) or z.nativeEnum(TaskKind) referencing your central TaskKind enum) so only valid task kind values are accepted; update imports to bring in the TaskKind enum or explicitly list allowed strings and adjust any callers/types if needed.apps/webapp/app/routes/resources.sessions.$sessionParam.close.ts-10-14 (1)
10-14:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winReuse the shared close-session schema here.
This local schema has already drifted from the public contract: it accepts arbitrarily long
reasonstrings, whileCloseSessionRequestBodycaps them at 256 chars. Import the shared schema (or at least mirror its limit) so the dashboard and API reject the same payloads.As per coding guidelines:
apps/webapp/**/*.{ts,tsx}should use subpath exports from@trigger.dev/corepackage instead of importing from the root@trigger.dev/corepath.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/webapp/app/routes/resources.sessions`.$sessionParam.close.ts around lines 10 - 14, The local closeSessionSchema has drifted from the shared CloseSessionRequestBody contract (it allows unlimited reason length); update closeSessionSchema to match the shared schema by either importing the shared CloseSessionRequestBody schema from the `@trigger.dev/core` subpath export and using it directly, or at minimum constrain reason to z.string().max(256). Ensure the import uses a subpath export from `@trigger.dev/core` (e.g. import { CloseSessionRequestBody } from '@trigger.dev/core/…') rather than importing from the package root, and replace or remove the local schema definition accordingly.apps/webapp/app/routes/realtime.v1.sessions.$session.$io.records.ts-18-23 (1)
18-23:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winReject oversized cursors before coercing them to
Number.
afterEventIdcurrently accepts any digit string viaz.string().regex(/^\d+$/), but line 74 coerces it unsafely withNumber(searchParams.afterEventId). Long inputs can becomeInfinityor lose precision, causing the endpoint to skip/duplicate records or pass an invalid cursor downstream. TightenSearchSchemato validate finite safe integers before conversion.Suggested fix
const SearchSchema = z.object({ // S2 sequence number — same cursor format as the SSE Last-Event-ID // (the SSE `id:` field on session-channel events is the seq_num, // stringified). Records returned have `seqNum > afterEventId`. - afterEventId: z.string().regex(/^\d+$/).optional(), + afterEventId: z + .string() + .regex(/^\d+$/) + .refine((value) => Number.isSafeInteger(Number(value)), { + message: "afterEventId must be a safe integer", + }) + .optional(), });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/webapp/app/routes/realtime.v1.sessions`.$session.$io.records.ts around lines 18 - 23, SearchSchema currently allows any digit string for afterEventId but the code later does Number(searchParams.afterEventId) (see usage of Number in the handler), which can produce Infinity or lose precision for oversized values; update SearchSchema to reject values outside JavaScript safe integer range (or use a z.preprocess that coerces the string to a number and validates Number.isFinite() && Number.isSafeInteger() and >= 0) so only finite safe non-negative integers are accepted before conversion, then replace the unsafe Number(...) usage with the validated numeric value from the parsed schema (keep references to SearchSchema and afterEventId and the Number(...) conversion to locate the change).apps/webapp/app/components/runs/v3/TaskRunsTable.tsx-357-361 (1)
357-361:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winRemove the unsafe type cast for
taskKind.The
taskKindfield is typed asTaskKind, which allows arbitrary strings via.or(anyString)in the Zod schema, whileTaskTriggerSourceis limited to three specific enum values:STANDARD,SCHEDULED, andAGENT. The cast bypasses type safety and could pass invalid values toTaskTriggerSourceIcon. Either narrow theTaskKindschema to exclude arbitrary strings, or validate the value before casting.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/webapp/app/components/runs/v3/TaskRunsTable.tsx` around lines 357 - 361, The code unsafely casts run.taskKind to TaskTriggerSource before passing it to TaskTriggerSourceIcon; instead validate or narrow the value: check that run.taskKind is one of the TaskTriggerSource enum values (e.g., compare against Object.values(TaskTriggerSource) or use a helper isTaskTriggerSource) and only pass it when valid, otherwise pass a safe fallback (or undefined) or render a default icon; alternatively tighten the TaskKind schema to only allow the TaskTriggerSource union so no cast is needed. Ensure references: TaskRunsTable.tsx, run.taskKind, TaskTriggerSource, TaskTriggerSourceIcon, and TaskKind.apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam.realtime.v1.$io.ts-61-69 (1)
61-69:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winReject zero and partially-numeric
Timeout-Secondsvalues.Line 63 uses
parseInt(), so values like"10foo"are accepted, and Line 66’s truthy guard lets"0"through even though this route documents a1..600range.Suggested fix
- const timeoutInSecondsRaw = request.headers.get("Timeout-Seconds") ?? undefined; - const timeoutInSeconds = timeoutInSecondsRaw ? parseInt(timeoutInSecondsRaw) : undefined; + const timeoutInSecondsRaw = request.headers.get("Timeout-Seconds") ?? undefined; + const timeoutInSeconds = + timeoutInSecondsRaw === undefined ? undefined : Number(timeoutInSecondsRaw); - if ( - timeoutInSeconds && - (isNaN(timeoutInSeconds) || timeoutInSeconds < 1 || timeoutInSeconds > 600) - ) { + if ( + timeoutInSeconds !== undefined && + (!Number.isInteger(timeoutInSeconds) || timeoutInSeconds < 1 || timeoutInSeconds > 600) + ) { return new Response("Invalid timeout", { status: 400 }); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/webapp/app/routes/resources.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam.realtime.v1.$io.ts around lines 61 - 69, The timeout parsing currently uses parseInt on timeoutInSecondsRaw (from the "Timeout-Seconds" header) which accepts partially-numeric strings like "10foo" and treats "0" as falsy; change the validation in the timeoutInSeconds calculation (and the subsequent if) to first verify the header matches a pure integer regex (e.g. /^\d+$/) before parsing, then parse to a number (Number or parseInt) and explicitly reject 0 and values outside 1..600; update the checks around timeoutInSecondsRaw/timeoutInSeconds and the error return to ensure partially-numeric and "0" values return 400.apps/webapp/app/components/runs/v3/RunFilters.tsx-194-196 (1)
194-196:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winValidate
sourcesagainst the supported trigger-source set.Right now
sourcesaccepts any non-empty string, so?sources=foosurvives parsing, shows up as an applied filter, and gets passed downstream even though this UI only supportsSTANDARD,SCHEDULED, andAGENT.Suggested fix
+const RunSource = z.enum(["STANDARD", "SCHEDULED", "AGENT"]); + export const TaskRunListSearchFilters = z.object({ cursor: z.string().optional().describe("Cursor for pagination - used internally for navigation"), direction: z .enum(["forward", "backward"]) .optional() @@ - sources: StringOrStringArray.describe( + sources: z + .preprocess((value) => { + if (typeof value === "string") { + return value.length > 0 ? [value] : undefined; + } + + if (Array.isArray(value)) { + return value.filter((v) => typeof v === "string" && v.length > 0); + } + + return undefined; + }, RunSource.array().optional()) + .describe( "Task trigger sources to filter by (STANDARD, SCHEDULED, AGENT)" - ), + ), });As per coding guidelines,
{packages/core,apps/webapp}/**/*.{ts,tsx}: Use zod for validation in packages/core and apps/webapp.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/webapp/app/components/runs/v3/RunFilters.tsx` around lines 194 - 196, Replace the loose StringOrStringArray.describe for the "sources" field with a zod-based schema that only allows the supported trigger sources: "STANDARD", "SCHEDULED", "AGENT". Specifically, change the "sources" schema in RunFilters.tsx to use zod.enum (or z.union of z.string and z.array(z.enum(...)) if you need single-or-array behavior) instead of StringOrStringArray.describe, and ensure parsing/validation uses that zod schema so queries like ?sources=foo are rejected/normalized before becoming an applied filter or passed downstream.
🧹 Nitpick comments (13)
packages/core/src/v3/test/test-realtime-streams-manager.ts (1)
22-22: 💤 Low value
pipeWaitsis write-only dead state.
pipeWaitsis populated inpipe()(lines 83-84) and cleared inreset()(line 160), but nothing ever reads from it. Either remove it, or expose thewaitAll(key?)helper this state seems to anticipate (useful for tests that fan out multiplepipe()calls and want to await them collectively without holding each returned instance).♻️ Option A — drop the unused field
- private pipeWaits = new Map<string, Promise<void>[]>(); private writeListeners = new Set<WriteListener>();- if (!this.pipeWaits.has(key)) this.pipeWaits.set(key, []); - this.pipeWaits.get(key)!.push(done); - return {reset(): void { this.buffers.clear(); - this.pipeWaits.clear(); }Option B — keep it and add a
__waitAllFromTest(key?)helper that awaits the trackeddonepromises so tests can drain before assertions.Also applies to: 83-84, 160-160
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/test/test-realtime-streams-manager.ts` at line 22, The pipeWaits Map field is never read (populated in pipe() and cleared in reset()) so either remove pipeWaits entirely or implement a test-facing waiter so it’s useful; to fix, choose one: (A) delete the private pipeWaits property and any code that pushes into it inside pipe() and reset(), or (B) keep pipeWaits and add a method like __waitAllFromTest(key?: string) / waitAll(key?: string) that returns Promise<void> which awaits Promise.all(this.pipeWaits.get(key) || []) and clears the stored array, and update reset() to use that helper to drain entries; reference the pipeWaits field, the pipe() method where entries are added, and the reset() method where they’re cleared when making the change.packages/core/src/v3/test/test-input-stream-manager.ts (1)
117-121: ⚡ Quick winTest manager's
shiftBuffer/disconnectStreamdiverge from real manager semantics.
shiftBufferalways returnsfalseanddisconnectStreamis a complete no-op, but the realStandardInputStreamManagershifts/clears the buffered head used byonce(). SincependingSendsplays exactly that role in this test fixture (drained byonce()at lines 62–68), tests that exercise.wait()-style flows (drop-the-duplicate / disconnect-before-suspend) will not observe correct behavior here.Aligning the stubs would make tests faithfully reproduce production semantics.
♻️ Suggested alignment
- shiftBuffer(_streamId: string): boolean { - return false; - } - - disconnectStream(_streamId: string): void {} + shiftBuffer(streamId: string): boolean { + const buffered = this.pendingSends.get(streamId); + if (buffered && buffered.length > 0) { + buffered.shift(); + if (buffered.length === 0) this.pendingSends.delete(streamId); + return true; + } + return false; + } + + disconnectStream(streamId: string): void { + this.pendingSends.delete(streamId); + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/test/test-input-stream-manager.ts` around lines 117 - 121, The test fixture's shiftBuffer and disconnectStream do not mimic production semantics: update shiftBuffer(streamId) to remove/shift the buffered head from the pendingSends structure for that stream and return true when it actually removed an entry (false otherwise), and update disconnectStream(streamId) to clear any buffered entries in pendingSends for that stream (and/or call shiftBuffer repeatedly) so that the once() consumer behavior that drains pendingSends is reproduced; reference the shiftBuffer and disconnectStream methods and the pendingSends collection and once() consumer when making the changes.packages/core/src/v3/sessionStreams/types.ts (1)
24-76: 💤 Low valuePrefer
typealias overinterfacefor the newSessionStreamManagerdeclaration.Per the repository's TypeScript guideline, types are preferred over interfaces. Since this is a brand-new declaration in a new file, it's a good opportunity to follow the guideline (the pre-existing
InputStreamManagerinterface can be migrated separately).♻️ Proposed change
-export interface SessionStreamManager { - /** Register a handler that fires every time data arrives on the given channel. */ - on( - sessionId: string, - io: SessionChannelIO, - handler: (data: unknown) => void | Promise<void> - ): { off: () => void }; - ... -} +export type SessionStreamManager = { + /** Register a handler that fires every time data arrives on the given channel. */ + on( + sessionId: string, + io: SessionChannelIO, + handler: (data: unknown) => void | Promise<void> + ): { off: () => void }; + ... +};As per coding guidelines: "Use types over interfaces for TypeScript".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/sessionStreams/types.ts` around lines 24 - 76, The declaration for SessionStreamManager should be converted from an interface to a type alias per repository TypeScript guidelines: replace the "interface SessionStreamManager { ... }" with "type SessionStreamManager = { ... }" preserving all member signatures (on, once, peek, lastSeqNum, setLastSeqNum, setMinTimestamp, shiftBuffer, disconnectStream, clearHandlers, reset, disconnect) and exported name; no API or runtime behavior should change, just the syntactic form (leave InputStreamManager untouched for separate migration).packages/core/src/v3/sessionStreams/manager.ts (1)
137-145: 💤 Low value
timeoutMs: 0silently disables the timeout.The truthy check on Line 137 treats
timeoutMs: 0as "no timeout", butTestSessionStreamManager.once(test-session-stream-manager.tsLine 102) usesoptions?.timeoutMs !== undefinedand would arm a 0ms timer that resolves immediately. The two implementations therefore disagree on0. If0is not a meaningful "fire immediately" value here, leave a comment; otherwise switch to an explicit!== undefinedcheck for parity.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/sessionStreams/manager.ts` around lines 137 - 145, The current truthy check in the once waiter (where waiter.timeoutHandle is set) treats timeoutMs: 0 as unset and thus disables the timer; align behavior with TestSessionStreamManager.once by checking explicitly for undefined (use options.timeoutMs !== undefined) so a 0ms timeout is honored; update the condition around waiter.timeoutHandle and ensure the timeout callback still uses options.timeoutMs (and constructs the InputStreamTimeoutError(key, options.timeoutMs!)) and leaves `#removeOnceWaiter` and resolve logic unchanged.packages/core/src/v3/test/test-session-stream-manager.ts (2)
144-146: 💤 Low value
setLastSeqNumis unconditionally monotonic in production but not here.
StandardSessionStreamManager.setLastSeqNumonly advances the stored seqNum whenseqNum > current(packages/core/src/v3/sessionStreams/manager.tsLines 168-171). The test manager overwrites unconditionally, which can mask a class of regressions where production-side regression is the whole point of the test (e.g., resume-after-reconnect with out-of-order acks). Consider matching production semantics unless there's a specific reason to diverge.♻️ Suggested fix
setLastSeqNum(sessionId: string, io: SessionChannelIO, seqNum: number): void { - this.seqNums.set(keyFor(sessionId, io), seqNum); + const key = keyFor(sessionId, io); + const current = this.seqNums.get(key); + if (current === undefined || seqNum > current) { + this.seqNums.set(key, seqNum); + } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/test/test-session-stream-manager.ts` around lines 144 - 146, The test implementation of setLastSeqNum currently overwrites seqNums unconditionally; change it to match production behavior in StandardSessionStreamManager by only updating this.seqNums for keyFor(sessionId, io) when the incoming seqNum is greater than the existing value (i.e., retrieve current via this.seqNums.get(...), compare and only call this.seqNums.set(...) if seqNum > current), preserving monotonic advancement semantics used for resume-after-reconnect/out-of-order ack tests.
197-214: 💤 Low valueDocstring misstates production behavior.
The comment on Lines 207-213 says "Production discards records that only match handlers — but in production the SSE tail introduces enough latency that the next
.onceis usually registered before the next record arrives." Looking atStandardSessionStreamManager.#dispatch(packages/core/src/v3/sessionStreams/manager.tsLines 364-378), production also buffers the record when there's no waiter (handlers fire and buffer is appended). The test manager's behavior actually mirrors production here; the only deviation is that production buffers fromonPart, not from a test-driver entry point. Update the comment so future readers don't get a misleading mental model.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/test/test-session-stream-manager.ts` around lines 197 - 214, The docstring in test-session-stream-manager.ts incorrectly claims production discards records that only match handlers; update the comment to reflect that StandardSessionStreamManager.#dispatch also appends to the buffer when no waiter is present (handlers fire and the record is buffered), and clarify that the real difference is where buffering is triggered (production buffers from onPart while the test manager buffers from the test-driver entry point). Edit the block describing dispatch rules (the paragraph starting with "If no waiter, the record is buffered...") to state this corrected behavior and mention the actual deviation between production and tests (buffer origin), referencing the TestSessionStreamManager and StandardSessionStreamManager.#dispatch to guide readers.packages/core/src/v3/sessionStreams/manager.test.ts (2)
101-134: 💤 Low valueTest name vs. assertion is slightly off; consider tightening.
The test claims it verifies that a filter on
"in"doesn't bleed into"out", but the firstmanageronly ever asserts on"out"and never proves the"in"filter on that same manager (Line 113 receives{ kind: "in-record" }, which the singleShot mock delivers to whichever io subscribes first). The actual filter assertion uses a second manager (manager2) with separate state (Lines 120-127). The proof you want — same manager, same session, different io, only one is filtered — isn't actually exercised. Consider building one mock that routes records per io (e.g., emit different fixtures on different_iocalls) so a single manager can demonstrate the per-io isolation end-to-end.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/sessionStreams/manager.test.ts` around lines 101 - 134, The test currently proves filtering on "in" using a separate manager (manager2) instead of demonstrating per-(sessionId, io) isolation on the same StandardSessionStreamManager; update the test to use a single StandardSessionStreamManager and a single mock API that emits different fixture records depending on the requested _io so that the same manager subscribes to both "in" and "out" and you can call setMinTimestamp(sessionId, "in", 5000) then assert once(sessionId, "out") returns the expected record while once(sessionId, "in") times out; modify or replace singleShotApiClient to inspect the _io argument and return distinct records for "in" vs "out", then keep using manager.disconnectStream/manager.disconnect to clean up.
13-44: 💤 Low valueMock cast bypasses
ApiClienttype checking.
as unknown as ApiClient(Line 43) silently absorbs any breaking change to theApiClientshape, and the inner cast on Line 41 papers over the async-generator vsAsyncIterableStreammismatch. IfsubscribeToSessionStreamever gains required surface (or its return type changes), these tests will keep compiling while the manager breaks at runtime against the real client. Consider typing the partial mock asPick<ApiClient, "subscribeToSessionStream">and avoiding the outer double-cast — the manager only depends on that one method.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/sessionStreams/manager.test.ts` around lines 13 - 44, The mock currently double-casts to ApiClient (singleShotApiClient) which hides shape mismatches; change the mock's type to return Pick<ApiClient, "subscribeToSessionStream"> instead of ApiClient and remove the outer `as unknown as ApiClient` cast; ensure the subscribeToSessionStream method signature and its returned async-iterable value match ApiClient's actual return type (use ReturnType<ApiClient["subscribeToSessionStream"]> / Awaited<...> for the function return) so you only need to satisfy the single method the manager uses (subscribeToSessionStream) and avoid masking future API shape changes; keep SSEStreamPart usage and the delivered logic intact.packages/core/src/v3/realtimeStreams/streamsWriterV2.test.ts (1)
5-16: ⚡ Quick winAssert
lastEventIdon the success path.The new contract here is
StreamWriteResult, but this test only proves thatwait()resolves. Ifwait()regresses back to{}orundefined, this suite still passes. SeedlastAckedPosition()with a knownseqNumand assert the returned{ lastEventId }.Also applies to: 132-148
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/realtimeStreams/streamsWriterV2.test.ts` around lines 5 - 16, The test currently stubs lastAckedPosition and appendSession but never asserts the StreamWriteResult.lastEventId; update the mock for lastAckedPosition (e.g., have lastAckedPosition return a known seqNum/object) and add an assertion after calling wait() that the resolved value includes lastEventId equal to that known seqNum; apply the same change to the second occurrence referenced (the block around lines 132-148) so both success-path tests verify lastEventId from lastAckedPosition via the appendSession mock and wait() result.packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts (2)
45-50: 💤 Low valueBest practice: Specify radix parameter for parseInt.
Lines 46 and 49 call
parseIntwithout the radix parameter. While the header values should be base-10 strings, explicitly passing10as the radix prevents potential issues and follows best practices.♻️ Add radix parameter
const flushIntervalMs = headers["x-s2-flush-interval-ms"] - ? parseInt(headers["x-s2-flush-interval-ms"]) + ? parseInt(headers["x-s2-flush-interval-ms"], 10) : undefined; const maxRetries = headers["x-s2-max-retries"] - ? parseInt(headers["x-s2-max-retries"]) + ? parseInt(headers["x-s2-max-retries"], 10) : undefined;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts` around lines 45 - 50, The parseInt calls used to parse headers["x-s2-flush-interval-ms"] and headers["x-s2-max-retries"] (producing flushIntervalMs and maxRetries) should specify a radix to avoid ambiguous parsing; update those parseInt invocations to pass 10 as the second argument (e.g., parseInt(..., 10)) so the header strings are parsed explicitly as base-10 integers.
29-38: 💤 Low valueOptional: Remove redundant optional chaining.
Line 37 uses
this.options?.requestOptions, butoptionsis a required constructor parameter and cannot be undefined. The?.operator can be simplified to..♻️ Simplify optional chaining
const response = await this.options.apiClient.initializeSessionStream( this.options.sessionId, this.options.io, - this.options?.requestOptions + this.options.requestOptions );🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts` around lines 29 - 38, The code uses redundant optional chaining for requestOptions in initializeWriter; since options is required on the SessionStreamInstance constructor (type SessionStreamInstanceOptions<T>), replace this.options?.requestOptions with this.options.requestOptions inside the initializeWriter method (the call to this.options.apiClient.initializeSessionStream) and remove the unnecessary ?. to simplify the expression.apps/webapp/app/presenters/v3/SessionPresenter.server.ts (1)
99-109: 💤 Low valueOptional: Consider extracting currentRun lookup for clarity.
The ternary and nullish coalescing on lines 99-108 are correct but require a non-null assertion on line 105. While this is safe (the outer ternary ensures
session.currentRunIdis truthy), extracting this logic into a helper or using an if-statement could improve readability.♻️ Alternative: Extract to helper or use if-statement
- const currentRun = session.currentRunId - ? runsById.get(session.currentRunId) ?? - (await startActiveSpan( - "SessionPresenter.findCurrentRunFallback", - () => - this.replica.taskRun.findFirst({ - where: { id: session.currentRunId! }, - select: { id: true, friendlyId: true, status: true }, - }) - )) - : null; + let currentRun = null; + if (session.currentRunId) { + currentRun = runsById.get(session.currentRunId); + if (!currentRun) { + currentRun = await startActiveSpan( + "SessionPresenter.findCurrentRunFallback", + () => + this.replica.taskRun.findFirst({ + where: { id: session.currentRunId }, + select: { id: true, friendlyId: true, status: true }, + }) + ); + } + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/webapp/app/presenters/v3/SessionPresenter.server.ts` around lines 99 - 109, The currentRun lookup uses a nested ternary/nullish coalescing with a non-null assertion on session.currentRunId; extract this into a small helper or an if-statement to improve readability and remove the need for the assertion. Specifically, replace the inline expression that builds currentRun (which references session.currentRunId, runsById, startActiveSpan("SessionPresenter.findCurrentRunFallback", ...), and this.replica.taskRun.findFirst) with a clear helper function (e.g., findCurrentRun) or an explicit if-block that: 1) checks if session.currentRunId is set, 2) returns runsById.get(session.currentRunId) if present, and 3) otherwise calls startActiveSpan(...)/this.replica.taskRun.findFirst to fetch and return the run; update the currentRun assignment to call that helper or use the if-block so no non-null assertion is required.apps/webapp/app/v3/services/createBackgroundWorker.server.ts (1)
348-348: ⚡ Quick winThe
as anycast bypasses type safety.Casting
task.agentConfig as anyremoves compile-time guarantees that the config is JSON-serializable. While this may be necessary for Prisma'sJsontype, consider using a Zod schema to validate the config structure before persistence, or at minimum add a comment explaining why the cast is safe.🛡️ Safer alternative with Zod validation
As per coding guidelines, use Zod for validation in apps/webapp:
+import { z } from "zod"; + +const AgentConfigSchema = z.record(z.unknown()).optional(); + async function createWorkerTask( task: TaskResource, ... ) { ... + const validatedConfig = task.agentConfig + ? AgentConfigSchema.parse(task.agentConfig) + : undefined; + await prisma.backgroundWorkerTask.create({ data: { ... - config: task.agentConfig ? (task.agentConfig as any) : undefined, + config: validatedConfig as any, }, }); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/webapp/app/v3/services/createBackgroundWorker.server.ts` at line 348, The cast "task.agentConfig as any" in the createBackgroundWorker flow removes type safety and should be replaced by runtime validation: define a Zod schema (e.g., AgentConfigSchema) that matches the expected JSON-serializable shape, call AgentConfigSchema.parse or safeParse on task.agentConfig inside the function that builds the Prisma payload (the spot assigning config), and use the validated value (or undefined) instead of the as any cast; if validation fails, handle the error (reject/create a clear log and abort persisting) or default to undefined. If a Zod schema cannot be introduced now, add a clear comment explaining why the cast is safe and what invariants guarantee JSON-serializability, referencing the config assignment (config: task.agentConfig ...) so future reviewers know why it's exempt.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 9a6d7e41-895b-4cd9-874d-48dd9f05eb37
📒 Files selected for processing (88)
.changeset/sessions-primitive.md.gitignoreCLAUDE.mdapps/webapp/app/components/BulkActionFilterSummary.tsxapps/webapp/app/components/runs/v3/RunFilters.tsxapps/webapp/app/components/runs/v3/TaskRunsTable.tsxapps/webapp/app/components/runs/v3/TaskTriggerSource.tsxapps/webapp/app/components/sessions/v1/CloseSessionDialog.tsxapps/webapp/app/components/sessions/v1/SessionFilters.tsxapps/webapp/app/components/sessions/v1/SessionStatus.tsxapps/webapp/app/components/sessions/v1/SessionsTable.tsxapps/webapp/app/presenters/RunFilters.server.tsapps/webapp/app/presenters/SessionFilters.server.tsapps/webapp/app/presenters/v3/ApiRunListPresenter.server.tsapps/webapp/app/presenters/v3/NextRunListPresenter.server.tsapps/webapp/app/presenters/v3/SessionListPresenter.server.tsapps/webapp/app/presenters/v3/SessionPresenter.server.tsapps/webapp/app/presenters/v3/TaskListPresenter.server.tsapps/webapp/app/presenters/v3/TestPresenter.server.tsapps/webapp/app/presenters/v3/TestTaskPresenter.server.tsapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions._index/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions/route.tsxapps/webapp/app/routes/api.v1.deployments.current.tsapps/webapp/app/routes/realtime.v1.sessions.$session.$io.records.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam.realtime.v1.$io.tsapps/webapp/app/routes/resources.sessions.$sessionParam.close.tsapps/webapp/app/routes/runs.$runParam.tsapps/webapp/app/runEngine/concerns/queues.server.tsapps/webapp/app/runEngine/services/triggerTask.server.tsapps/webapp/app/runEngine/types.tsapps/webapp/app/services/apiRateLimit.server.tsapps/webapp/app/services/realtime/mintRunToken.server.tsapps/webapp/app/services/realtime/s2realtimeStreams.server.tsapps/webapp/app/services/realtime/sessionRunManager.server.tsapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.tsapps/webapp/app/services/runsRepository/runsRepository.server.tsapps/webapp/app/services/sessionsRepository/clickhouseSessionsRepository.server.tsapps/webapp/app/services/sessionsRepository/sessionsRepository.server.tsapps/webapp/app/utils/pathBuilder.tsapps/webapp/app/v3/services/createBackgroundWorker.server.tsinternal-packages/clickhouse/schema/029_add_task_kind_to_task_runs_v2.sqlinternal-packages/clickhouse/src/taskRuns.test.tsinternal-packages/clickhouse/src/taskRuns.tsinternal-packages/database/prisma/migrations/20260329100903_add_agent_trigger_source_and_task_config/migration.sqlinternal-packages/database/prisma/migrations/20260330113734_add_playground_conversation/migration.sqlinternal-packages/database/prisma/migrations/20260330135232_add_messages_and_last_event_id_to_playground/migration.sqlinternal-packages/database/prisma/schema.prismapackages/core/src/v3/apiClient/errors.tspackages/core/src/v3/apiClient/runStream.test.tspackages/core/src/v3/apiClient/runStream.tspackages/core/src/v3/inputStreams/index.tspackages/core/src/v3/inputStreams/manager.tspackages/core/src/v3/inputStreams/noopManager.tspackages/core/src/v3/inputStreams/types.tspackages/core/src/v3/realtime-streams-api.tspackages/core/src/v3/realtimeStreams/index.tspackages/core/src/v3/realtimeStreams/manager.tspackages/core/src/v3/realtimeStreams/noopManager.tspackages/core/src/v3/realtimeStreams/sessionStreamInstance.tspackages/core/src/v3/realtimeStreams/streamInstance.tspackages/core/src/v3/realtimeStreams/streamsWriterV1.tspackages/core/src/v3/realtimeStreams/streamsWriterV2.test.tspackages/core/src/v3/realtimeStreams/streamsWriterV2.tspackages/core/src/v3/realtimeStreams/types.tspackages/core/src/v3/schemas/api.tspackages/core/src/v3/schemas/build.tspackages/core/src/v3/schemas/resources.tspackages/core/src/v3/schemas/runEngine.tspackages/core/src/v3/schemas/schemas.tspackages/core/src/v3/semanticInternalAttributes.tspackages/core/src/v3/session-streams-api.tspackages/core/src/v3/sessionStreams/index.tspackages/core/src/v3/sessionStreams/manager.test.tspackages/core/src/v3/sessionStreams/manager.tspackages/core/src/v3/sessionStreams/noopManager.tspackages/core/src/v3/sessionStreams/types.tspackages/core/src/v3/test/test-input-stream-manager.tspackages/core/src/v3/test/test-realtime-streams-manager.tspackages/core/src/v3/test/test-run-metadata-manager.tspackages/core/src/v3/test/test-session-stream-manager.tspackages/core/src/v3/types/tasks.tspackages/core/src/v3/utils/globals.tspackages/core/src/v3/workers/index.tspackages/core/src/v3/workers/taskExecutor.tspackages/core/test/runStream.test.ts
| import { CheckCircleIcon, ClockIcon } from "@heroicons/react/20/solid"; | ||
| import assertNever from "assert-never"; | ||
| import { type SessionStatus } from "~/services/sessionsRepository/sessionsRepository.server"; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify the server-only import and locate a shared non-server SessionStatus definition.
rg -n 'sessionsRepository\.server' apps/webapp/app/components/sessions/v1/SessionStatus.tsx
rg -n '\b(SessionStatus|allSessionStatuses)\b' apps/webapp/app packages/core -g '!**/*.server.ts'Repository: triggerdotdev/trigger.dev
Length of output: 3838
Replace the .server module import with the type from @trigger.dev/core.
SessionStatus.tsx is a client-side component but imports SessionStatus from a .server module, which violates Remix's server/client boundary and will fail bundling. Import the type from packages/core/src/v3/schemas/api.ts instead, which provides the same SessionStatus type definition.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@apps/webapp/app/components/sessions/v1/SessionStatus.tsx` around lines 1 - 3,
SessionStatus.tsx currently imports the SessionStatus type from a ".server"
module which breaks the client/server boundary; change the import to pull the
type from the core package (import type { SessionStatus } from
"@trigger.dev/core") so the component remains client-safe, and use a type-only
import to avoid bundling server code while keeping existing symbols like
SessionStatus, CheckCircleIcon, and ClockIcon unchanged.
| ? this.replica.taskRun.findMany({ | ||
| where: { id: { in: currentRunIds } }, | ||
| select: { id: true, friendlyId: true }, | ||
| }) |
There was a problem hiding this comment.
Scope the current-run lookup to the same project/environment.
Session.currentRunId is a plain string column with no FK, so resolving runs by id alone can surface another tenant’s run if that pointer is ever stale or corrupted. This follow-up lookup should be constrained to the same projectId and environmentId.
Suggested fix
return currentRunIds.length > 0
? this.replica.taskRun.findMany({
- where: { id: { in: currentRunIds } },
+ where: {
+ id: { in: currentRunIds },
+ projectId,
+ runtimeEnvironmentId: environmentId,
+ },
select: { id: true, friendlyId: true },
})
: [];🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@apps/webapp/app/presenters/v3/SessionListPresenter.server.ts` around lines
163 - 166, The current lookup using this.replica.taskRun.findMany({ where: { id:
{ in: currentRunIds } }, ... }) is unsafe because Session.currentRunId has no FK
and can return runs from other projects/environments; update the where clause to
also constrain by projectId and environmentId (e.g. include projectId: { in:
currentProjectIds } and environmentId: { in: currentEnvironmentIds } or match
the session's projectId/environmentId context) so the findMany call only returns
taskRun rows belonging to the same project/environment as the sessions; adjust
how currentRunIds are grouped/mapped if needed to ensure correct scoping for
this.replica.taskRun.findMany and references to Session.currentRunId.
| const environment = await findEnvironmentBySlug(project.id, envParam, userId); | ||
| if (!environment) { | ||
| throw new Error("Environment not found"); |
There was a problem hiding this comment.
Return a user-facing 404 here instead of throwing a generic error.
Line 42 turns a bad or unauthorized environment slug into an unhandled route error, while the adjacent project-not-found path already uses user-facing handling.
Suggested fix
const environment = await findEnvironmentBySlug(project.id, envParam, userId);
if (!environment) {
- throw new Error("Environment not found");
+ throw new Response("Environment not found", { status: 404 });
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const environment = await findEnvironmentBySlug(project.id, envParam, userId); | |
| if (!environment) { | |
| throw new Error("Environment not found"); | |
| const environment = await findEnvironmentBySlug(project.id, envParam, userId); | |
| if (!environment) { | |
| throw new Response("Environment not found", { status: 404 }); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.sessions._index/route.tsx
around lines 40 - 42, Replace the generic throw in the environment lookup with a
user-facing 404 response: when findEnvironmentBySlug(project.id, envParam,
userId) returns falsy, throw or return a Response with status 404 (e.g., throw
new Response("Environment not found", { status: 404 })) so the route renders a
proper Not Found page instead of bubbling a generic Error; update the branch
that currently throws new Error("Environment not found") to use this 404
Response (referencing findEnvironmentBySlug and the environment check in the
route handler).
| const environment = await findEnvironmentBySlug(project.id, envParam, userId); | ||
| if (!environment) { | ||
| throw new Error("Environment not found"); | ||
| } |
There was a problem hiding this comment.
Handle a missing environment as a 404, not a generic error.
Line 74 makes an invalid or unauthorized environment slug fall into the route error boundary/500 path instead of returning a user-facing not-found response.
Suggested fix
const environment = await findEnvironmentBySlug(project.id, envParam, userId);
if (!environment) {
- throw new Error("Environment not found");
+ throw new Response("Environment not found", { status: 404 });
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const environment = await findEnvironmentBySlug(project.id, envParam, userId); | |
| if (!environment) { | |
| throw new Error("Environment not found"); | |
| } | |
| const environment = await findEnvironmentBySlug(project.id, envParam, userId); | |
| if (!environment) { | |
| throw new Response("Environment not found", { status: 404 }); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsx
around lines 72 - 75, The code currently throws a generic Error when
findEnvironmentBySlug returns null; change this to return a 404 response so
missing/unauthorized env slugs hit the not-found UI instead of the error
boundary. Replace the throw new Error("Environment not found") in the block
after await findEnvironmentBySlug(project.id, envParam, userId) with a 404
response (e.g., throw new Response("Not Found", { status: 404 }) or use your
framework's notFound() helper) so the route returns a proper 404 for a missing
environment.
| select: { | ||
| deployment: { | ||
| select: { | ||
| friendlyId: true, | ||
| createdAt: true, | ||
| shortCode: true, | ||
| version: true, | ||
| runtime: true, | ||
| runtimeVersion: true, | ||
| status: true, | ||
| deployedAt: true, | ||
| git: true, | ||
| errorData: true, | ||
| }, |
There was a problem hiding this comment.
Return updatedAt to match the shared response schema.
RetrieveCurrentDeploymentResponseBody now reuses ApiDeploymentListResponseItem, which requires updatedAt, but this loader neither selects nor returns it. Clients validating against the shared schema will fail on this endpoint until the field is included.
Suggested fix
select: {
deployment: {
select: {
friendlyId: true,
createdAt: true,
+ updatedAt: true,
shortCode: true,
version: true,
runtime: true,
runtimeVersion: true,
status: true,
@@
return json({
id: deployment.friendlyId,
createdAt: deployment.createdAt,
+ updatedAt: deployment.updatedAt,
shortCode: deployment.shortCode,
version: deployment.version,
runtime: deployment.runtime,Also applies to: 42-53
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@apps/webapp/app/routes/api.v1.deployments.current.ts` around lines 20 - 33,
The loader is missing the updatedAt field required by
ApiDeploymentListResponseItem/ RetrieveCurrentDeploymentResponseBody; update the
deployment select in this route so the nested deployment.select includes
updatedAt (add updatedAt: true wherever deployment is being selected in this
file) so the returned object matches the shared response schema.
| vi.mock("@s2-dev/streamstore", async (importOriginal) => { | ||
| const actual = await importOriginal<typeof import("@s2-dev/streamstore")>(); | ||
| return { | ||
| ...actual, | ||
| S2: class FakeS2 { | ||
| basin() { | ||
| return { | ||
| stream: () => ({ | ||
| appendSession, | ||
| }), | ||
| }; | ||
| } | ||
| }, | ||
| }; | ||
| }); |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Avoid vi.mock() here; test this path without module mocking.
This mock sidesteps the exact appendSession() / lastAckedPosition() behavior that StreamsWriterV2.wait() now depends on, and it also conflicts with the repo’s test rules. A small extraction of the JSON-envelope/size-check logic into a pure helper would let you cover the oversize branch without mocking @s2-dev/streamstore.
As per coding guidelines, "Use vitest exclusively for testing. Never mock anything - use testcontainers instead."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/core/src/v3/realtimeStreams/streamsWriterV2.test.ts` around lines 18
- 32, Remove the vi.mock block and test this path without mocking; instead
extract the JSON-envelope/size-check logic from StreamsWriterV2.wait() into a
pure helper (e.g., computeEnvelopeSize or isEnvelopeOversize) and update
StreamsWriterV2.wait() to call that helper so you can unit-test the oversize
branch directly in streamsWriterV2.test.ts. For tests that require real
appendSession/lastAckedPosition behavior, replace the mocked module usage with
an integration test using testcontainers to start the real `@s2-dev/streamstore`
service and assert StreamsWriterV2.wait() behavior against the real
appendSession/lastAckedPosition semantics. Ensure references to appendSession,
lastAckedPosition, and StreamsWriterV2.wait are updated to call the new helper
and to use testcontainers-based integration tests instead of vi.mock.
| /** Per-run wall-clock cap (seconds). Forwarded to `TaskRunOptions.maxDuration`. */ | ||
| maxDuration: z.number().int().positive().optional(), | ||
| /** Pin every run to a specific worker version. Forwarded to `TaskRunOptions.lockToVersion`. */ | ||
| lockToVersion: z.string().optional(), | ||
| /** Region to schedule runs in. Forwarded to `TaskRunOptions.region`. */ | ||
| region: z.string().optional(), |
There was a problem hiding this comment.
Match session maxDuration to the existing 5-second floor.
TriggerOptions.maxDuration is documented in this file as requiring at least 5 seconds, but SessionTriggerConfig.maxDuration currently accepts 1..4. That lets callers persist a session config that only fails later when the session tries to trigger a run.
Suggested fix
- maxDuration: z.number().int().positive().optional(),
+ maxDuration: z.number().int().min(5).optional(),As per coding guidelines: {packages/core,apps/webapp}/**/*.{ts,tsx} should use Zod for validation in packages/core and apps/webapp.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /** Per-run wall-clock cap (seconds). Forwarded to `TaskRunOptions.maxDuration`. */ | |
| maxDuration: z.number().int().positive().optional(), | |
| /** Pin every run to a specific worker version. Forwarded to `TaskRunOptions.lockToVersion`. */ | |
| lockToVersion: z.string().optional(), | |
| /** Region to schedule runs in. Forwarded to `TaskRunOptions.region`. */ | |
| region: z.string().optional(), | |
| /** Per-run wall-clock cap (seconds). Forwarded to `TaskRunOptions.maxDuration`. */ | |
| maxDuration: z.number().int().min(5).optional(), | |
| /** Pin every run to a specific worker version. Forwarded to `TaskRunOptions.lockToVersion`. */ | |
| lockToVersion: z.string().optional(), | |
| /** Region to schedule runs in. Forwarded to `TaskRunOptions.region`. */ | |
| region: z.string().optional(), |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/core/src/v3/schemas/api.ts` around lines 1498 - 1503,
SessionTriggerConfig.maxDuration allows values 1–4 but must match
TriggerOptions.maxDuration's 5-second minimum; update the Zod schema for
SessionTriggerConfig.maxDuration in packages/core/src/v3/schemas/api.ts to
enforce an integer minimum of 5 seconds (e.g., use .int().min(5) or
.int().gte(5) before .optional()), ensuring the same validation as
TriggerOptions.maxDuration so invalid configs are rejected at persist time.
| async #runTail( | ||
| sessionId: string, | ||
| io: SessionChannelIO, | ||
| signal: AbortSignal | ||
| ): Promise<void> { | ||
| const key = keyFor(sessionId, io); | ||
| try { | ||
| const lastSeq = this.seqNums.get(key); | ||
| // Dispatch is driven from `onPart` (not the for-await loop) so each | ||
| // record reaches dispatch with its full SSE metadata in scope — | ||
| // specifically the timestamp, which we need for the per-stream | ||
| // min-timestamp filter. The for-await loop below just drains the | ||
| // pipeThrough output to keep the source flowing. | ||
| const stream = await this.apiClient.subscribeToSessionStream<unknown>(sessionId, io, { | ||
| signal, | ||
| baseUrl: this.baseUrl, | ||
| timeoutInSeconds: 600, | ||
| lastEventId: lastSeq !== undefined ? String(lastSeq) : undefined, | ||
| onPart: (part) => { | ||
| if (signal.aborted) return; | ||
| const seqNum = parseInt(part.id, 10); | ||
| if (Number.isFinite(seqNum)) { | ||
| this.seqNums.set(key, seqNum); | ||
| } | ||
|
|
||
| // Min-timestamp filter: drop records older than (or at) the | ||
| // bound. Used to skip already-processed records on OOM-retry | ||
| // boot. | ||
| const minTs = this.minTimestamps.get(key); | ||
| if (minTs !== undefined && part.timestamp <= minTs) { | ||
| return; | ||
| } | ||
|
|
||
| let data: unknown = part.chunk; | ||
| if (typeof data === "string") { | ||
| try { | ||
| data = JSON.parse(data); | ||
| } catch { | ||
| // keep as string | ||
| } | ||
| } | ||
| this.#dispatch(key, data); | ||
| }, | ||
| onComplete: () => { | ||
| if (this.debug) { | ||
| console.log(`[SessionStreamManager] Tail completed for "${key}"`); | ||
| } | ||
| }, | ||
| onError: (error) => { | ||
| if (this.debug) { | ||
| console.error(`[SessionStreamManager] Tail error for "${key}":`, error); | ||
| } | ||
| }, | ||
| }); | ||
|
|
||
| // Drain to keep the pipeThrough flowing. Records were already | ||
| // dispatched in `onPart`, so the body here is a no-op. | ||
| for await (const _record of stream) { | ||
| if (signal.aborted) break; | ||
| } | ||
| } catch (error) { | ||
| if (error instanceof Error && error.name === "AbortError") return; | ||
| throw error; | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Compare reconnect/backoff behavior between InputStreamManager and SessionStreamManager
fd -t f 'manager.ts' packages/core/src/v3/inputStreams packages/core/src/v3/sessionStreams --exec cat {}Repository: triggerdotdev/trigger.dev
Length of output: 27244
🏁 Script executed:
# Check if there's any backoff/retry delay mechanism in either manager or apiClient
rg -t ts "backoff|delay|setTimeout.*reconnect|minDelay|exponential" packages/core/src/v3/Repository: triggerdotdev/trigger.dev
Length of output: 16648
🏁 Script executed:
# Find subscribeToSessionStream definition
rg -t ts "subscribeToSessionStream" packages/core/src/v3 -A 5 | head -50Repository: triggerdotdev/trigger.dev
Length of output: 2242
🏁 Script executed:
# Find the actual subscribeToSessionStream definition in apiClient
fd -t f -name "*.ts" packages/core/src/v3/apiClient --exec grep -l "subscribeToSessionStream" {}Repository: triggerdotdev/trigger.dev
Length of output: 300
🏁 Script executed:
# Find the subscribeToSessionStream method definition
rg -t ts "subscribeToSessionStream\s*\(" packages/core/src/v3/apiClient -A 30Repository: triggerdotdev/trigger.dev
Length of output: 51
🏁 Script executed:
# Search for ApiClient class and subscribeToSessionStream method
rg -t ts "class ApiClient|subscribeToSessionStream" packages/core/src/v3 -B 2 -A 10Repository: triggerdotdev/trigger.dev
Length of output: 6608
🏁 Script executed:
# Get the full ApiClient class to find subscribeToSessionStream method
rg -t ts "subscribeToSessionStream" packages/core/src/v3/apiClient/index.ts -A 25 -B 2Repository: triggerdotdev/trigger.dev
Length of output: 51
🏁 Script executed:
# Search all files for subscribeToSessionStream definition
rg -t ts "subscribeToSessionStream.*\{" packages/core/src/v3 -A 30Repository: triggerdotdev/trigger.dev
Length of output: 2591
🏁 Script executed:
# Search for the method signature in ApiClient
rg -t ts "async subscribeToSessionStream|subscribeToSessionStream\s*<" packages/core/src/v3/apiClient/ -A 20Repository: triggerdotdev/trigger.dev
Length of output: 51
🏁 Script executed:
# Search for subscribeToSessionStream in type definitions
rg -t ts "subscribeToSessionStream" packages/core/src/v3 --type-list | head -20Repository: triggerdotdev/trigger.dev
Length of output: 459
🏁 Script executed:
# Look at the ApiClient index file structure
head -200 packages/core/src/v3/apiClient/index.ts | tail -100Repository: triggerdotdev/trigger.dev
Length of output: 2479
🏁 Script executed:
# Find where subscribeToSessionStream is defined in apiClient
grep -n "subscribeToSessionStream" packages/core/src/v3/apiClient/index.tsRepository: triggerdotdev/trigger.dev
Length of output: 51
🏁 Script executed:
# List all files in apiClient directory
ls -la packages/core/src/v3/apiClient/Repository: triggerdotdev/trigger.dev
Length of output: 887
🏁 Script executed:
# Search for subscribeToSessionStream method in index.ts
rg "subscribeToSessionStream" packages/core/src/v3/apiClient/index.tsRepository: triggerdotdev/trigger.dev
Length of output: 51
🏁 Script executed:
# Search all of packages/core for the definition of subscribeToSessionStream
rg "subscribeToSessionStream.*=" packages/core/src/v3 -A 5Repository: triggerdotdev/trigger.dev
Length of output: 51
🏁 Script executed:
# Search for subscribeToSessionStream definition with more flexibility
rg "subscribeToSessionStream" packages/core/src/v3 -B 2 -A 15 | head -60Repository: triggerdotdev/trigger.dev
Length of output: 5405
🏁 Script executed:
# Search entire codebase for the method implementation
rg "subscribeToSessionStream\s*\(" packages/core/src -A 20 | grep -v "test.ts" | grep -v "await this.apiClient"Repository: triggerdotdev/trigger.dev
Length of output: 51
Add backoff and error handling to prevent reconnect loops on persistent failures.
In #runTail (lines 335-339), onError only logs (when debug is on) and never aborts or rethrows. If subscribeToSessionStream fails with a non-abort error, the for-await loop exits normally, #runTail returns, and the .finally reconnect path at lines 251-260 immediately re-attaches whenever hasHandlers || hasWaiters. With a persistent failure (auth rejected, 5xx, DNS, etc.) and an on() handler installed, this creates an unbounded tight reconnect loop with no backoff.
The same issue exists in StandardInputStreamManager—both managers lack exponential backoff on reconnect failures. Consider adding minimum backoff before reconnect, capping retry attempts, or surfacing terminal errors through persistent handlers / pending once() waiters so callers can react.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/core/src/v3/sessionStreams/manager.ts` around lines 287 - 351, The
tail loop in `#runTail` uses subscribeToSessionStream with an onError that only
logs, causing immediate reconnection loops when persistent errors occur; update
onError to surface terminal failures by aborting the provided AbortSignal or
throwing a distinct error so the outer caller can detect a failure, and
implement exponential backoff with a capped delay and retry limit in the
reconnect logic that currently decides to re-attach when hasHandlers ||
hasWaiters (the same pattern should be applied to StandardInputStreamManager).
Concretely: modify the onError handler passed into subscribeToSessionStream to
call signal.abort() or forward a non-Abort terminal Error to cause `#runTail` to
exit with an error, then in the surrounding reconnect/finally path add a backoff
loop (backoff base, multiplier, maxDelay, and maxAttempts) before re-invoking
`#runTail`; ensure seqNums/minTimestamps/#dispatch behavior is unchanged and that
aborts still return cleanly for AbortError.
| /** | ||
| * Trigger a task and subscribe to its updates via realtime. Unlike `triggerAndWait`, | ||
| * this does NOT suspend the parent run — the parent stays alive and polls for updates. | ||
| * This enables parallel tool calls and proper abort signal handling. | ||
| * | ||
| * @param payload | ||
| * @param options - Options for the task run, including an optional `signal` to cancel the subscription and child run | ||
| * @returns TaskRunPromise | ||
| * @example | ||
| * ``` | ||
| * const result = await task.triggerAndSubscribe({ foo: "bar" }, { signal: abortSignal }); | ||
| * | ||
| * if (result.ok) { | ||
| * console.log(result.output); | ||
| * } else { | ||
| * console.error(result.error); | ||
| * } | ||
| * ``` | ||
| */ | ||
| triggerAndSubscribe: ( | ||
| payload: TInput, | ||
| options?: TriggerAndSubscribeOptions, | ||
| ) => TaskRunPromise<TIdentifier, TOutput>; |
There was a problem hiding this comment.
Add a changeset for this SDK surface change.
triggerAndSubscribe()/TriggerAndSubscribeOptions extend the public packages/core API, so this PR needs a changeset before merge or consumers won't get a versioned release note for the new surface.
Based on learnings: When modifying any public package (packages/* or integrations/*), add a changeset using pnpm run changeset:add.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/core/src/v3/types/tasks.ts` around lines 650 - 672, This PR adds a
new public API surface (triggerAndSubscribe and TriggerAndSubscribeOptions) in
packages/core, so add a changeset describing this public API addition: run `pnpm
run changeset:add`, select the "packages/core" package, choose a version bump
(likely patch), and write a short summary noting the new
Task.triggerAndSubscribe and TriggerAndSubscribeOptions export so consumers
receive a release note; commit the generated changeset file with the PR.
Adds Sessions, a durable, run-aware stream primitive that scopes session.in / session.out records to a session (not a single run). Records survive run boundaries; reconnect-from-last-event-id is built in. Server foundation: - New /realtime/v1/sessions/:session/:io/append + /records routes - sessionRunManager + sessionsRepository + clickhouseSessionsRepository - mintRunToken for short-lived per-session tokens - s2Append retry-with-backoff + undici cause diagnostics - /api/v[12]/packets/* exempt from customer rate limits - BackgroundWorker schema gains taskKind enum (TASK, AGENT, SCHEDULED) - TaskRun.taskKind column + clickhouse 029_add_task_kind_to_task_runs_v2 Core types: - new sessionStreams, inputStreams, realtimeStreams packages in @trigger.dev/core - session-streams-api / realtime-streams-api surface Sessions dashboard UI (the primitive's own viewer): - /sessions index + detail routes - SessionsTable, SessionFilters, SessionStatus, CloseSessionDialog - AGENT/SCHEDULED filter in RunFilters + TaskTriggerSource Includes the sessions-primitive changeset.
b84d537 to
ed7bf97
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/core/src/v3/schemas/runEngine.ts`:
- Around line 18-19: The TaskKind schema currently uses .or(anyString) which
widens the canonical TaskKind to any string and destroys the literal type;
revert TaskKind to just the z.enum(["STANDARD","SCHEDULED","AGENT"]) definition
so TaskKind infers the union of those three literals, and remove the
.or(anyString) usage; if you need a permissive parser keep a separate exported
schema (e.g., TaskKindLoose = TaskKind.or(z.string())) for forward-compat
parsing while leaving TaskKind strict; update references that relied on the
previous loose schema to use TaskKindLoose where appropriate.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 13f751b0-d010-4fef-b3c2-a1cb754be003
📒 Files selected for processing (88)
.changeset/sessions-primitive.md.gitignoreCLAUDE.mdapps/webapp/app/components/BulkActionFilterSummary.tsxapps/webapp/app/components/runs/v3/RunFilters.tsxapps/webapp/app/components/runs/v3/TaskRunsTable.tsxapps/webapp/app/components/runs/v3/TaskTriggerSource.tsxapps/webapp/app/components/sessions/v1/CloseSessionDialog.tsxapps/webapp/app/components/sessions/v1/SessionFilters.tsxapps/webapp/app/components/sessions/v1/SessionStatus.tsxapps/webapp/app/components/sessions/v1/SessionsTable.tsxapps/webapp/app/presenters/RunFilters.server.tsapps/webapp/app/presenters/SessionFilters.server.tsapps/webapp/app/presenters/v3/ApiRunListPresenter.server.tsapps/webapp/app/presenters/v3/NextRunListPresenter.server.tsapps/webapp/app/presenters/v3/SessionListPresenter.server.tsapps/webapp/app/presenters/v3/SessionPresenter.server.tsapps/webapp/app/presenters/v3/TaskListPresenter.server.tsapps/webapp/app/presenters/v3/TestPresenter.server.tsapps/webapp/app/presenters/v3/TestTaskPresenter.server.tsapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions._index/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions/route.tsxapps/webapp/app/routes/api.v1.deployments.current.tsapps/webapp/app/routes/realtime.v1.sessions.$session.$io.records.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam.realtime.v1.$io.tsapps/webapp/app/routes/resources.sessions.$sessionParam.close.tsapps/webapp/app/routes/runs.$runParam.tsapps/webapp/app/runEngine/concerns/queues.server.tsapps/webapp/app/runEngine/services/triggerTask.server.tsapps/webapp/app/runEngine/types.tsapps/webapp/app/services/apiRateLimit.server.tsapps/webapp/app/services/realtime/mintRunToken.server.tsapps/webapp/app/services/realtime/s2realtimeStreams.server.tsapps/webapp/app/services/realtime/sessionRunManager.server.tsapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.tsapps/webapp/app/services/runsRepository/runsRepository.server.tsapps/webapp/app/services/sessionsRepository/clickhouseSessionsRepository.server.tsapps/webapp/app/services/sessionsRepository/sessionsRepository.server.tsapps/webapp/app/utils/pathBuilder.tsapps/webapp/app/v3/services/createBackgroundWorker.server.tsinternal-packages/clickhouse/schema/029_add_task_kind_to_task_runs_v2.sqlinternal-packages/clickhouse/src/taskRuns.test.tsinternal-packages/clickhouse/src/taskRuns.tsinternal-packages/database/prisma/migrations/20260329100903_add_agent_trigger_source_and_task_config/migration.sqlinternal-packages/database/prisma/migrations/20260330113734_add_playground_conversation/migration.sqlinternal-packages/database/prisma/migrations/20260330135232_add_messages_and_last_event_id_to_playground/migration.sqlinternal-packages/database/prisma/schema.prismapackages/core/src/v3/apiClient/errors.tspackages/core/src/v3/apiClient/runStream.test.tspackages/core/src/v3/apiClient/runStream.tspackages/core/src/v3/inputStreams/index.tspackages/core/src/v3/inputStreams/manager.tspackages/core/src/v3/inputStreams/noopManager.tspackages/core/src/v3/inputStreams/types.tspackages/core/src/v3/realtime-streams-api.tspackages/core/src/v3/realtimeStreams/index.tspackages/core/src/v3/realtimeStreams/manager.tspackages/core/src/v3/realtimeStreams/noopManager.tspackages/core/src/v3/realtimeStreams/sessionStreamInstance.tspackages/core/src/v3/realtimeStreams/streamInstance.tspackages/core/src/v3/realtimeStreams/streamsWriterV1.tspackages/core/src/v3/realtimeStreams/streamsWriterV2.test.tspackages/core/src/v3/realtimeStreams/streamsWriterV2.tspackages/core/src/v3/realtimeStreams/types.tspackages/core/src/v3/schemas/api.tspackages/core/src/v3/schemas/build.tspackages/core/src/v3/schemas/resources.tspackages/core/src/v3/schemas/runEngine.tspackages/core/src/v3/schemas/schemas.tspackages/core/src/v3/semanticInternalAttributes.tspackages/core/src/v3/session-streams-api.tspackages/core/src/v3/sessionStreams/index.tspackages/core/src/v3/sessionStreams/manager.test.tspackages/core/src/v3/sessionStreams/manager.tspackages/core/src/v3/sessionStreams/noopManager.tspackages/core/src/v3/sessionStreams/types.tspackages/core/src/v3/test/test-input-stream-manager.tspackages/core/src/v3/test/test-realtime-streams-manager.tspackages/core/src/v3/test/test-run-metadata-manager.tspackages/core/src/v3/test/test-session-stream-manager.tspackages/core/src/v3/types/tasks.tspackages/core/src/v3/utils/globals.tspackages/core/src/v3/workers/index.tspackages/core/src/v3/workers/taskExecutor.tspackages/core/test/runStream.test.ts
✅ Files skipped from review due to trivial changes (10)
- packages/core/src/v3/semanticInternalAttributes.ts
- packages/core/src/v3/workers/index.ts
- CLAUDE.md
- .gitignore
- internal-packages/database/prisma/migrations/20260330113734_add_playground_conversation/migration.sql
- internal-packages/database/prisma/migrations/20260329100903_add_agent_trigger_source_and_task_config/migration.sql
- .changeset/sessions-primitive.md
- internal-packages/clickhouse/schema/029_add_task_kind_to_task_runs_v2.sql
- packages/core/src/v3/realtimeStreams/index.ts
- internal-packages/clickhouse/src/taskRuns.test.ts
🚧 Files skipped from review as they are similar to previous changes (76)
- apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
- apps/webapp/app/components/BulkActionFilterSummary.tsx
- apps/webapp/app/presenters/v3/TaskListPresenter.server.ts
- internal-packages/database/prisma/migrations/20260330135232_add_messages_and_last_event_id_to_playground/migration.sql
- apps/webapp/app/presenters/SessionFilters.server.ts
- apps/webapp/app/services/runsReplicationService.server.ts
- packages/core/src/v3/realtimeStreams/streamsWriterV1.ts
- packages/core/src/v3/realtimeStreams/noopManager.ts
- packages/core/src/v3/realtimeStreams/streamInstance.ts
- packages/core/src/v3/realtime-streams-api.ts
- apps/webapp/app/v3/services/createBackgroundWorker.server.ts
- packages/core/src/v3/schemas/resources.ts
- packages/core/src/v3/inputStreams/noopManager.ts
- apps/webapp/app/components/runs/v3/TaskRunsTable.tsx
- apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
- apps/webapp/app/presenters/RunFilters.server.ts
- packages/core/src/v3/inputStreams/manager.ts
- packages/core/src/v3/schemas/schemas.ts
- packages/core/src/v3/workers/taskExecutor.ts
- packages/core/src/v3/realtimeStreams/manager.ts
- packages/core/src/v3/inputStreams/types.ts
- packages/core/src/v3/apiClient/errors.ts
- packages/core/src/v3/utils/globals.ts
- packages/core/src/v3/realtimeStreams/streamsWriterV2.test.ts
- apps/webapp/app/services/realtime/mintRunToken.server.ts
- apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions/route.tsx
- apps/webapp/app/routes/runs.$runParam.ts
- apps/webapp/app/presenters/v3/SessionPresenter.server.ts
- packages/core/src/v3/schemas/api.ts
- apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions._index/route.tsx
- apps/webapp/app/components/sessions/v1/CloseSessionDialog.tsx
- apps/webapp/app/routes/api.v1.deployments.current.ts
- packages/core/src/v3/sessionStreams/types.ts
- apps/webapp/app/components/sessions/v1/SessionStatus.tsx
- apps/webapp/app/services/runsRepository/runsRepository.server.ts
- apps/webapp/app/presenters/v3/TestPresenter.server.ts
- packages/core/src/v3/sessionStreams/manager.test.ts
- packages/core/test/runStream.test.ts
- packages/core/src/v3/types/tasks.ts
- apps/webapp/app/routes/resources.sessions.$sessionParam.close.ts
- apps/webapp/app/components/runs/v3/TaskTriggerSource.tsx
- apps/webapp/app/services/realtime/s2realtimeStreams.server.ts
- apps/webapp/app/services/sessionsRepository/clickhouseSessionsRepository.server.ts
- apps/webapp/app/utils/pathBuilder.ts
- apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam/route.tsx
- packages/core/src/v3/session-streams-api.ts
- apps/webapp/app/presenters/v3/SessionListPresenter.server.ts
- packages/core/src/v3/realtimeStreams/streamsWriterV2.ts
- apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts
- packages/core/src/v3/test/test-realtime-streams-manager.ts
- packages/core/src/v3/sessionStreams/index.ts
- apps/webapp/app/services/apiRateLimit.server.ts
- packages/core/src/v3/inputStreams/index.ts
- apps/webapp/app/routes/realtime.v1.sessions.$session.$io.records.ts
- packages/core/src/v3/sessionStreams/noopManager.ts
- internal-packages/clickhouse/src/taskRuns.ts
- internal-packages/database/prisma/schema.prisma
- apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts
- apps/webapp/app/services/realtime/sessionRunManager.server.ts
- apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sessions.$sessionParam.realtime.v1.$io.ts
- apps/webapp/app/runEngine/services/triggerTask.server.ts
- apps/webapp/app/services/sessionsRepository/sessionsRepository.server.ts
- apps/webapp/app/runEngine/types.ts
- packages/core/src/v3/test/test-input-stream-manager.ts
- packages/core/src/v3/test/test-run-metadata-manager.ts
- packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts
- apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts
- packages/core/src/v3/sessionStreams/manager.ts
- packages/core/src/v3/apiClient/runStream.test.ts
- apps/webapp/app/components/sessions/v1/SessionFilters.tsx
- apps/webapp/app/components/runs/v3/RunFilters.tsx
- apps/webapp/app/components/sessions/v1/SessionsTable.tsx
- packages/core/src/v3/apiClient/runStream.ts
- apps/webapp/app/runEngine/concerns/queues.server.ts
- packages/core/src/v3/realtimeStreams/types.ts
- packages/core/src/v3/test/test-session-stream-manager.ts
📜 Review details
🧰 Additional context used
📓 Path-based instructions (6)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects insteadImport from
@trigger.dev/coresubpaths only, never from the root of the package
Files:
packages/core/src/v3/schemas/runEngine.tspackages/core/src/v3/schemas/build.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use zod for validation in packages/core and apps/webapp
Files:
packages/core/src/v3/schemas/runEngine.tspackages/core/src/v3/schemas/build.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Use
pnpm run typecheck --filter <package>for verification in apps and internal packages, notbuild. Building proves almost nothing about correctness.
Files:
packages/core/src/v3/schemas/runEngine.tspackages/core/src/v3/schemas/build.ts
**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queriesAlways import from
@trigger.dev/sdk, never from@trigger.dev/sdk/v3or deprecatedclient.defineJobAdd
//@Crumbsmarkers or wrap blocks in `// `#region` `@crumbsfor debug tracing as you write code, not just when debugging. These are stripped before merge.
Files:
packages/core/src/v3/schemas/runEngine.tspackages/core/src/v3/schemas/build.ts
packages/core/**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (packages/core/CLAUDE.md)
Never import the root package (
@trigger.dev/core). Always use subpath imports such as@trigger.dev/core/v3,@trigger.dev/core/v3/utils,@trigger.dev/core/logger, or@trigger.dev/core/schemas
Files:
packages/core/src/v3/schemas/runEngine.tspackages/core/src/v3/schemas/build.ts
**/*.{ts,tsx,js,jsx,json,md,css,scss}
📄 CodeRabbit inference engine (AGENTS.md)
Code formatting is enforced using Prettier. Run
pnpm run formatbefore committing
Files:
packages/core/src/v3/schemas/runEngine.tspackages/core/src/v3/schemas/build.ts
🧠 Learnings (3)
📓 Common learnings
Learnt from: CR
Repo: triggerdotdev/trigger.dev
Timestamp: 2026-05-11T19:01:52.274Z
Learning: This is a pnpm 10.33.2 monorepo using Turborepo. Run commands from root with `pnpm run`.
Learnt from: CR
Repo: triggerdotdev/trigger.dev
Timestamp: 2026-05-11T19:01:52.274Z
Learning: Use vitest exclusively for testing. Run tests using `pnpm run test --filter <package>` or `pnpm run test ./path/to/test.ts --run` for single files.
Learnt from: CR
Repo: triggerdotdev/trigger.dev
Timestamp: 2026-05-11T19:01:52.274Z
Learning: Request flow architecture: User API call -> Webapp routes -> Services -> RunEngine -> Redis Queue -> Supervisor -> Container execution -> Results back through RunEngine -> ClickHouse (analytics) + PostgreSQL (state)
Learnt from: CR
Repo: triggerdotdev/trigger.dev
Timestamp: 2026-05-11T19:01:52.274Z
Learning: The `apps/webapp/app/v3/` directory contains mostly V2 code that is actively used. Only specific files are V1-only legacy. When encountering V1/V2 branching in services, only modify V2 code paths. All new work uses Run Engine 2.0 and redis-worker.
Learnt from: CR
Repo: triggerdotdev/trigger.dev
Timestamp: 2026-05-11T19:01:52.274Z
Learning: Do NOT update `rules/` directory or `.claude/skills/trigger-dev-tasks/` unless explicitly asked - these are maintained in separate dedicated passes.
📚 Learning: 2026-03-22T13:26:12.060Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3244
File: apps/webapp/app/components/code/TextEditor.tsx:81-86
Timestamp: 2026-03-22T13:26:12.060Z
Learning: In the triggerdotdev/trigger.dev codebase, do not flag `navigator.clipboard.writeText(...)` calls for `missing-await`/`unhandled-promise` issues. These clipboard writes are intentionally invoked without `await` and without `catch` handlers across the project; keep that behavior consistent when reviewing TypeScript/TSX files (e.g., usages like in `apps/webapp/app/components/code/TextEditor.tsx`).
Applied to files:
packages/core/src/v3/schemas/runEngine.tspackages/core/src/v3/schemas/build.ts
📚 Learning: 2026-03-22T19:24:14.403Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3187
File: apps/webapp/app/v3/services/alerts/deliverErrorGroupAlert.server.ts:200-204
Timestamp: 2026-03-22T19:24:14.403Z
Learning: In the triggerdotdev/trigger.dev codebase, webhook URLs are not expected to contain embedded credentials/secrets (e.g., fields like `ProjectAlertWebhookProperties` should only hold credential-free webhook endpoints). During code review, if you see logging or inclusion of raw webhook URLs in error messages, do not automatically treat it as a credential-leak/secrets-in-logs issue by default—first verify the URL does not contain embedded credentials (for example, no username/password in the URL, no obvious secret/token query params or fragments). If the URL is credential-free per this project’s conventions, allow the logging.
Applied to files:
packages/core/src/v3/schemas/runEngine.tspackages/core/src/v3/schemas/build.ts
🔇 Additional comments (3)
packages/core/src/v3/schemas/build.ts (2)
3-9: Import update looks correct and properly scoped.
SkillManifestis imported from the existing local schema module and is used by the new manifest fields.
79-80:skillsschema additions are consistent and backward-compatible.Adding
skillsas optional on bothBuildManifestandWorkerManifestkeeps old manifests valid while enabling the new capability.Also applies to: 98-98
packages/core/src/v3/schemas/runEngine.ts (1)
26-26: No additional concern here beyond theTaskKindschema issue above.
| export const TaskKind = z.enum(["STANDARD", "SCHEDULED", "AGENT"]).or(anyString); | ||
| export type TaskKind = z.infer<typeof TaskKind>; |
There was a problem hiding this comment.
TaskKind validation is currently non-restrictive and loses type safety.
TaskKind.or(anyString) makes the schema accept any string, so TaskKind infers to string and no longer enforces "STANDARD" | "SCHEDULED" | "AGENT" at boundaries.
Suggested fix
-export const TaskKind = z.enum(["STANDARD", "SCHEDULED", "AGENT"]).or(anyString);
+export const TaskKind = z.enum(["STANDARD", "SCHEDULED", "AGENT"]);
export type TaskKind = z.infer<typeof TaskKind>;If you still need forward-compat parsing in select readers, keep a separate loose schema instead of broadening the canonical one:
export const TaskKindLoose = TaskKind.or(z.string());📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export const TaskKind = z.enum(["STANDARD", "SCHEDULED", "AGENT"]).or(anyString); | |
| export type TaskKind = z.infer<typeof TaskKind>; | |
| export const TaskKind = z.enum(["STANDARD", "SCHEDULED", "AGENT"]); | |
| export type TaskKind = z.infer<typeof TaskKind>; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/core/src/v3/schemas/runEngine.ts` around lines 18 - 19, The TaskKind
schema currently uses .or(anyString) which widens the canonical TaskKind to any
string and destroys the literal type; revert TaskKind to just the
z.enum(["STANDARD","SCHEDULED","AGENT"]) definition so TaskKind infers the union
of those three literals, and remove the .or(anyString) usage; if you need a
permissive parser keep a separate exported schema (e.g., TaskKindLoose =
TaskKind.or(z.string())) for forward-compat parsing while leaving TaskKind
strict; update references that relied on the previous loose schema to use
TaskKindLoose where appropriate.
Layer 1 of 5 in the chat.agent stack split
Adds Sessions, a durable, run-aware stream primitive that scopes
session.in/session.outrecords to a session (not a single run).Records survive run boundaries; reconnect-from-last-event-id is built in.
This is the foundation everything else builds on.
Targets
main— merge this firstReplaces #3173 (closed). Original branch backed up at
backup/tri-7532-pre-stack-split-20260510.Server foundation
/realtime/v1/sessions/:session/:io/append+/recordsroutessessionRunManager+sessionsRepository+clickhouseSessionsRepositorymintRunTokenfor short-lived per-session tokenss2Appendretry-with-backoff + undici cause diagnostics/api/v[12]/packets/*exempt from customer rate limitsBackgroundWorkergainstaskKindenum (TASK, AGENT, SCHEDULED)TaskRun.taskKindcolumn + clickhouse029_add_task_kind_to_task_runs_v2Core types
sessionStreams,inputStreams,realtimeStreamsmodules in@trigger.dev/coresession-streams-api/realtime-streams-apisurfaceSessions dashboard UI (the primitive's own viewer)
/sessionsindex + detail routesSessionsTable,SessionFilters,SessionStatus,CloseSessionDialogRunFilters+TaskTriggerSourceStack